-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: abstract db for other services, move sheduler out of core #74
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
WalkthroughThis pull request introduces significant architectural changes to the core system, focusing on restructuring the MongoDB database interaction, task scheduling, and orchestrator management. The modifications include moving the Changes
Possibly related PRs
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (3)
packages/core/src/core/db/mongo-db.ts (1)
Line range hint
212-239
: Unsuccessful queries if_id
is stored asObjectId
.The code uses
{ _id: orchestratorId }
butorchestratorId
is a string. If the_id
in MongoDB is anObjectId
, this query will fail to match. Convert the string to anObjectId
or store_id
as a string consistently:await this.orchestratorCollection.updateOne( - { _id: orchestratorId }, + { _id: new ObjectId(orchestratorId) }, ... ); ... return this.orchestratorCollection.findOne({ - _id: orchestratorId, + _id: new ObjectId(orchestratorId), });Also applies to: 261-265, 268-283
packages/core/src/core/orchestrator.ts (1)
Line range hint
258-282
: EnsuregetOrchestratorById
returns the correct record.Because
_id
in Mongo might be stored as anObjectId
whileorchestratorId
is a string, this call risks returningnull
. Consider either storing_id
as a string or casting from string toObjectId
in the query. Otherwise, each new orchestrator attempt will create a new record unnecessarily.examples/example-server.ts (1)
Line range hint
228-252
: Inconsistent ID handling in API endpoints.The API endpoints still use ObjectId while the rest of the codebase has moved to string IDs. This inconsistency should be addressed.
- let objectId; - try { - objectId = new ObjectId(chatId); - } catch (err) { - return res.status(400).json({ error: "Invalid chat ID format" }); - } - - const history = await scheduledTaskDb.getOrchestratorById(objectId); + const history = await scheduledTaskDb.getOrchestratorById(chatId);
🧹 Nitpick comments (10)
packages/core/src/core/schedule-service.ts (3)
23-32
: Consider concurrency checks or leadership election for multiple service instances.If multiple
SchedulerService
instances run in parallel, there is a possibility that more than one instance may pick up the same tasks. To avoid duplicate processing, consider implementing a “leadership election” or adding a stronger guard (like an atomic “compare and set” on task status) to ensure that only one scheduler processes a pending task at a time.
90-119
: Validate scheduling parameters to prevent misconfigured intervals.If an end-user sets a negative or extremely large interval, it could result in scheduling anomalies. A quick validation step (e.g., ensuring
intervalMs >= 0
) helps avoid unexpected behavior.
120-125
: Consider graceful shutdown.Although
stop()
clears the polling interval, any in-flight tasks won’t be canceled. If you need a truly graceful shutdown, ensure you also wait for currently running tasks to finish before exiting your application.packages/core/src/core/db/mongo-db.ts (2)
133-133
: Use a "failed" status for errors.
Marking tasks as “completed” or “failed” is a good approach to categorize the final outcome. Make sure that any relevant logs or metrics reflect the difference so that failed tasks can be retried or inspected.
150-150
: Revert the status to “pending” only after verifying concurrency.If multiple service instances run, one instance could update the same task’s
nextRunAt
, while another instance is concurrently running it. Consider an additional check to confirm the current status before re-pending the task.packages/core/src/core/orchestrator.ts (1)
356-379
: Scheduling tasks here is consistent but verify concurrency approach.Creating tasks directly in the Orchestrator helps chain further actions. However, if multiple Orchestrator instances run concurrently, tasks may be scheduled more than once. Consider a uniqueness clause or deduplication approach to avoid duplicates for the same user/event.
packages/core/src/core/memory.ts (2)
33-34
: Consider adding error types for database operations.The connect and close methods could benefit from specific error types to handle different failure scenarios.
- connect(): Promise<void>; - close(): Promise<void>; + connect(): Promise<void | DatabaseConnectionError>; + close(): Promise<void | DatabaseCloseError>;
9-9
: Consider using a more specific type for taskData.Using
Record<string, any>
reduces type safety. Consider defining a specific interface or type for task data based on your use cases.- taskData: Record<string, any>; + taskData: TaskData; +interface TaskData { + // Define specific fields based on your use cases + [key: string]: string | number | boolean | object; +}examples/example-twitter.ts (1)
179-185
: Consider extracting magic numbers for intervals.The interval values should be defined as named constants at the top of the file for better maintainability.
+const MENTION_CHECK_INTERVAL_MS = 6000; // Check mentions every minute +const CONSCIOUSNESS_INTERVAL_MS = 30000; // Think every 5 minutes - await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); - await scheduler.scheduleTaskInDb("sleever", "consciousness_thoughts", {}, 30000); + await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, MENTION_CHECK_INTERVAL_MS); + await scheduler.scheduleTaskInDb("sleever", "consciousness_thoughts", {}, CONSCIOUSNESS_INTERVAL_MS);examples/example-api.ts (1)
Line range hint
1-13
: Update file documentation to reflect architectural changes.The file header comments should be updated to:
- Remove Twitter-specific references since this is now a general API example
- Document the database abstraction layer usage
Apply this diff to update the documentation:
/** - * Example demonstrating a Twitter bot using the Daydreams package. + * Example demonstrating a general API integration using the Daydreams package. * This bot can: - * - Monitor Twitter mentions and auto-reply - * - Generate autonomous thoughts and tweet them + * - Process API requests and auto-reply + * - Generate autonomous thoughts * - Maintain conversation memory using ChromaDB + * - Store scheduled tasks using MongoDB * - Process inputs through a character-based personality */
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
examples/example-api.ts
(1 hunks)examples/example-discord.ts
(1 hunks)examples/example-server.ts
(2 hunks)examples/example-twitter.ts
(8 hunks)packages/core/src/core/db/mongo-db.ts
(9 hunks)packages/core/src/core/index.ts
(2 hunks)packages/core/src/core/memory.ts
(1 hunks)packages/core/src/core/orchestrator.ts
(14 hunks)packages/core/src/core/schedule-service.ts
(1 hunks)packages/core/src/core/task-scheduler.ts
(0 hunks)
💤 Files with no reviewable changes (1)
- packages/core/src/core/task-scheduler.ts
✅ Files skipped from review due to trivial changes (1)
- examples/example-discord.ts
🔇 Additional comments (13)
packages/core/src/core/db/mongo-db.ts (3)
2-2
: No issues found.
This import line is correct and maps toHandlerRole
from../types
.
75-79
: Be cautious storing_id
as a string in tasks.You are manually overriding the
_id
field withnew ObjectId().toString()
. While valid, this diverges from typical Mongo usage (where_id
isObjectId
). Ensure that queries match this string-based_id
, and be mindful of potential consistency issues if other parts of the code expectObjectId
s.
117-117
: Atomic updates look good.By using
$set
with{_id: taskId}
, the code ensures consistent status updates for “running” tasks. Keep in mind that concurrency conflicts could arise if another scheduler tries to pick up the same task at the same time—assuming it's the intended design, you may want to use options like{upsert: false}
or check themodifiedCount
.packages/core/src/core/orchestrator.ts (2)
Line range hint
56-65
: Check ID consistency with OrchestratorDb.The constructor now receives
OrchestratorDb
, which implies string-based IDs. Ensure each place referencing an Orchestrator ID uses a string or properly casts to/from anObjectId
so that queries match records in Mongo.
313-315
: Logging and message additions look good.Providing rich logs and storing messages for each step is beneficial for observability and debugging. Ensure sensitive data is properly sanitized before logging, if applicable.
Also applies to: 410-435
packages/core/src/core/index.ts (1)
19-19
: LGTM! Clean replacement of TaskScheduler with SchedulerService.The changes maintain the module's public API while moving the scheduler functionality out of core, aligning with the PR objectives.
Also applies to: 40-40
packages/core/src/core/memory.ts (1)
4-73
: Well-structured interfaces with clear separation of concerns.The interfaces provide a solid foundation for abstracting database operations and memory management.
examples/example-server.ts (2)
20-20
: LGTM! Import path updated to reflect new directory structure.The change is consistent with the architectural changes moving the MongoDb class.
154-154
: LGTM! Simplified orchestratorId handling.The change to use string IDs directly improves consistency and removes unnecessary ObjectId wrapping.
examples/example-twitter.ts (2)
76-92
: Well-structured scheduler initialization with proper configuration.Good separation of concerns and clear configuration of the scheduler service.
226-229
: LGTM! Clean handler removal during shutdown.The shutdown process properly removes all registered handlers.
examples/example-api.ts (2)
23-23
: LGTM! Import path updated correctly.The import path change aligns with the PR objective of abstracting database implementations.
Line range hint
67-71
: Consider using a database interface for better abstraction.Since the PR aims to abstract the database layer, consider updating the example to demonstrate how to use alternative database implementations.
Let's verify if an interface or base class exists for database implementations:
Also applies to: 73-74
private async pollTasks() { | ||
try { | ||
const tasks = await this.context.orchestratorDb.findDueTasks(); | ||
for (const task of tasks) { | ||
await this.context.orchestratorDb.markRunning(task._id); | ||
|
||
const handler = this.orchestrator.getHandler(task.handlerName); | ||
if (!handler) { | ||
this.context.logger.warn("No handler found", "warn", { | ||
name: task.handlerName, | ||
}); | ||
continue; | ||
} | ||
|
||
// parse out data | ||
const data = JSON.parse(task.taskData.task_data); | ||
|
||
switch (handler.role) { | ||
case HandlerRole.INPUT: | ||
await this.orchestrator.dispatchToInput( | ||
task.handlerName, | ||
data, | ||
task.userId | ||
); | ||
break; | ||
case HandlerRole.ACTION: | ||
await this.orchestrator.dispatchToAction( | ||
task.handlerName, | ||
data | ||
); | ||
break; | ||
case HandlerRole.OUTPUT: | ||
await this.orchestrator.dispatchToOutput( | ||
task.handlerName, | ||
data | ||
); | ||
break; | ||
} | ||
|
||
// handle recurring or complete | ||
if (task.intervalMs) { | ||
await this.context.orchestratorDb.updateNextRun( | ||
task._id, | ||
new Date(Date.now() + task.intervalMs) | ||
); | ||
} else { | ||
await this.context.orchestratorDb.markCompleted(task._id); | ||
} | ||
} | ||
} catch (err) { | ||
this.context.logger.error("pollTasks error", "error", { | ||
data: err, | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle task failures with a fallback flow.
If an error occurs after calling markRunning
but before calling markCompleted
, the task remains stuck in the “running” state. You may want to:
- Wrap each handler dispatch in its own
try/catch
block and mark the task as failed if an exception occurs. - Consider a “retry” mechanism or a “failed” status to differentiate between normal completions and errors.
for (const task of tasks) {
try {
+ await this.context.orchestratorDb.markRunning(task._id);
const handler = this.orchestrator.getHandler(task.handlerName);
...
+ } catch (error) {
+ await this.context.orchestratorDb.markCompleted(task._id, true);
+ this.context.logger.error("pollTasks error", "error", { error });
+ }
}
Committable suggestion skipped: line range outside the PR's diff.
public async createOrchestrator(userId: string): Promise<string> { | ||
const chat: OrchestratorChat = { | ||
userId: userId, | ||
createdAt: new Date(), | ||
updatedAt: new Date(), | ||
messages: [], | ||
}; | ||
const result = await this.orchestratorCollection.insertOne(chat); | ||
return result.insertedId; | ||
return result.insertedId.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential mismatch between _id
storage for orchestrators and tasks.
Here, you rely on MongoDB’s default _id
(an ObjectId
) and convert it to a string, whereas for tasks you store a string-based _id
directly in the document. Make sure to consistently store _id
as either an ObjectId
or a string across your collections to avoid confusion and difficulty in referencing records.
const chat: OrchestratorChat = {
+ _id: new ObjectId().toString(),
userId: userId,
...
};
const result = await this.orchestratorCollection.insertOne(chat);
- return result.insertedId.toString();
+ return chat._id;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
public async createOrchestrator(userId: string): Promise<string> { | |
const chat: OrchestratorChat = { | |
userId: userId, | |
createdAt: new Date(), | |
updatedAt: new Date(), | |
messages: [], | |
}; | |
const result = await this.orchestratorCollection.insertOne(chat); | |
return result.insertedId; | |
return result.insertedId.toString(); | |
public async createOrchestrator(userId: string): Promise<string> { | |
const chat: OrchestratorChat = { | |
_id: new ObjectId().toString(), | |
userId: userId, | |
createdAt: new Date(), | |
updatedAt: new Date(), | |
messages: [], | |
}; | |
const result = await this.orchestratorCollection.insertOne(chat); | |
return chat._id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (11)
packages/core/src/core/processors/master-processor.ts (4)
17-33
: Use a constructor-level comment or docstring to clarify usage and dependencies.The constructor’s parameters are self-explanatory but adding a short docstring clarifying how the
character
,llmClient
, and other fields fit together would improve maintainability and onboarding.
39-46
: Ensure alignment ofcanHandle
logic withcontentLimit
.Currently,
canHandle
checks if the content length is< 1000
, matching the default in the base class constructor. If you allow further changes tocontentLimit
, consider referencingthis.contentLimit
explicitly in the docstring or method name for clarity.
215-221
: Leverage optional chaining to reduce complexity.Static analysis suggests using an optional chain for safety. However, your current check is still valid. If you prefer a fluent style, consider this refactor:
- if (childProcessor && childProcessor.canHandle(content)) { + if (childProcessor?.canHandle(content)) {🧰 Tools
🪛 Biome (1.9.4)
[error] 221-221: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
238-285
: Provide more nuanced fallback handling.When an error occurs, the fallback response always returns neutral sentiment without giving the user enough context. Consider capturing partial results (if any) or providing an error field to alert upstream consumers.
packages/core/src/core/processor.ts (5)
10-11
: Encourage a typed record for child processors.While
Map<string, BaseProcessor>
is adequate, using a typed object literal or an expanded interface might enable clearer relationships and reduce possible runtime errors.
15-17
: Constructor parameters could benefit from partial typing or defaults.You removed the default for
loggerLevel
. If there are no strict requirements that it must always be provided, consider a default to avoid collisions in other places that instantiateBaseProcessor
.
34-39
: Expand docstring to convey the intended usage ofgetDescription
.This method currently returns the processor’s description. Documenting when and how it might be used (e.g., debugging, generating help text) helps future maintainers.
58-72
: Validate child processors’ compatibility prior to adding.
addProcessor
throws an error if a processor name exists. That’s a good safeguard. Additionally, consider verifying that child processors have unique roles or are valid for delegation, so you avoid hidden conflicts in the chain of responsibility.
74-79
: Provide fallback or log if the requestedname
does not exist.When
getProcessor
is called with a name that does not exist, returningundefined
might silence debugging signals. Logging an info or debug message could help trace errors faster in large deployments.packages/core/src/core/orchestrator.ts (2)
218-225
: Move debug logging before action execution.The debug logging should be placed before executing the action to ensure we capture the input context even if the action fails.
try { + this.logger.debug( + "Orchestrator.dispatchToAction", + "Executing action", + { + name, + data, + } + ); const result = await handler.execute(data); - this.logger.debug( - "Orchestrator.dispatchToAction", - "Executing action", - { - name, - data, - } - ); return result;
346-369
: Extract task scheduling logic into a separate method.The task scheduling logic could be extracted into a dedicated method for better maintainability and reusability.
+ private async scheduleTask( + userId: string, + task: { name: string; data: any; intervalMs?: number } + ): Promise<void> { + const now = Date.now(); + const nextRunAt = new Date(now + (task.intervalMs ?? 0)); + + this.logger.info( + "Orchestrator.scheduleTask", + `Scheduling task ${task.name}`, + { + nextRunAt, + intervalMs: task.intervalMs, + } + ); + + await this.orchestratorDb.createTask( + userId, + task.name, + { + request: task.name, + task_data: JSON.stringify(task.data), + }, + nextRunAt, + task.intervalMs + ); + } // In runAutonomousFlow: - const now = Date.now(); - const nextRunAt = new Date(now + (task.intervalMs ?? 0)); - - this.logger.info( - "Orchestrator.runAutonomousFlow", - `Scheduling task ${task.name}`, - { - nextRunAt, - intervalMs: task.intervalMs, - } - ); - - await this.orchestratorDb.createTask( - userId, - task.name, - { - request: task.name, - task_data: JSON.stringify(task.data), - }, - nextRunAt, - task.intervalMs - ); + await this.scheduleTask(userId, task);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
examples/example-api.ts
(3 hunks)examples/example-discord.ts
(3 hunks)examples/example-server.ts
(3 hunks)examples/example-twitter.ts
(9 hunks)packages/core/src/core/orchestrator.ts
(18 hunks)packages/core/src/core/processor.ts
(2 hunks)packages/core/src/core/processors/index.ts
(1 hunks)packages/core/src/core/processors/master-processor.ts
(1 hunks)packages/core/src/core/types/index.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- examples/example-api.ts
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/core/processors/master-processor.ts
[error] 221-221: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
🔇 Additional comments (14)
packages/core/src/core/processors/master-processor.ts (1)
48-55
: ValidateotherContext
to avoid potential unhandled edge cases.If
otherContext
is empty or has unexpected data types, it could lead to unexpected behaviors within the prompt. Consider input validation or at least logging a warning ifotherContext
is not a string.packages/core/src/core/processors/index.ts (1)
3-3
: ExportingMasterProcessor
.This addition successfully exposes
MasterProcessor
from the module, aligning with other processor exports. No concerns here.packages/core/src/core/processor.ts (1)
53-54
: Optional properties forioContext
are helpful for flexibility.Updating these properties to optional is a good step to enhance reusability. Ensure any calling code gracefully handles
undefined
outputs or actions.packages/core/src/core/types/index.ts (1)
288-288
: Verify processor chain handling.The addition of
nextProcessor
enables dynamic processor chaining, but we should ensure proper handling of processor chains to prevent potential issues.Run this script to check for potential circular dependencies in processor chains:
✅ Verification successful
No processor chain implementation concerns found
The
nextProcessor
property is currently unused and optional. The existing processor management system has proper safeguards against duplicate processors, and the current implementation uses a simple delegation pattern without complex chaining.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Search for processor chain implementations to verify proper handling # Look for processor chain implementations echo "Searching for processor chain implementations..." rg -A 10 "nextProcessor.*=.*" # Look for circular chain prevention logic echo "Searching for circular chain prevention..." rg -A 10 "addProcessor|getProcessor"Length of output: 4945
examples/example-discord.ts (1)
43-57
: LGTM! Clean implementation of the new processor architecture.The initialization and registration of processors follow a clear pattern:
- Create master processor
- Create message processor
- Register message processor with master processor
examples/example-server.ts (2)
55-69
: LGTM! Consistent implementation of the processor architecture.The processor initialization follows the same pattern as other examples, maintaining consistency across the codebase.
163-163
: Verify the impact of orchestratorId type change.The orchestratorId is now passed directly without wrapping in
ObjectId
. Ensure this change is compatible with MongoDB operations.✅ Verification successful
The orchestratorId type change is safe and compatible
The codebase consistently uses string type for orchestratorId across all MongoDB operations (find, insert, update). The removal of ObjectId wrapping aligns with the existing implementation in the database layer.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check MongoDB operations with orchestratorId # Search for MongoDB operations using orchestratorId rg -A 5 "orchestratorId.*=|getOrchestratorById"Length of output: 2994
Script:
#!/bin/bash # Check for ObjectId usage and other MongoDB operations rg -l "ObjectId|new Mongo" && \ ast-grep --pattern 'import { ObjectId } from "mongodb"' && \ rg "orchestratorCollection.*find|orchestratorCollection.*update|orchestratorCollection.*insert"Length of output: 966
examples/example-twitter.ts (2)
46-60
: LGTM! Consistent processor architecture implementation.The processor initialization maintains consistency with other examples in the codebase.
85-101
: Well-structured scheduler implementation with proper configuration.The SchedulerService is initialized with:
- Appropriate logger configuration
- Required dependencies
- Reasonable polling interval (10 seconds)
packages/core/src/core/orchestrator.ts (5)
26-29
: LGTM! Well-documented database abstraction.The introduction of
OrchestratorDb
type and its documentation improves the modularity of the system by abstracting database operations.
68-71
: LGTM! Enhanced logging initialization.The addition of initialization logging helps with debugging and monitoring the orchestrator's lifecycle.
134-136
: LGTM! Improved handler management logging.The structured logging in
removeIOHandler
enhances debugging capabilities by providing clear context about handler removal operations.
564-570
: LGTM! Well-documented content processing logic.The comprehensive documentation clearly explains the processing steps and improves code maintainability.
36-40
: Verify unsubscriber cleanup in all scenarios.The
unsubscribers
map stores cleanup functions for handlers. Ensure that all scenarios (error cases, re-initialization, etc.) properly clean up these references to prevent memory leaks.✅ Verification successful
The unsubscriber cleanup implementation is correct and complete
The code properly manages unsubscriber lifecycle:
- Unsubscribers are created only for input handlers with subscribe functionality
- Each unsubscriber is properly cleaned up in removeIOHandler before the handler is removed
- Error handling and logging are in place throughout the implementation
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for all places where unsubscribers are added but not cleaned up rg -A 5 "this\.unsubscribers\.set" . rg -A 5 "this\.unsubscribers\.delete" .Length of output: 892
Script:
#!/bin/bash # Search for cleanup methods and error handling ast-grep --pattern 'class Orchestrator { $$$ destroy() { $$$ } $$$ }' # Look for shutdown or cleanup methods rg -B 5 "unsubscribers\.clear" . # Check error handling in registration rg -A 10 "registerIOHandler.*async" .Length of output: 158
Script:
#!/bin/bash # Get the full implementation of registerIOHandler and related methods ast-grep --pattern 'class Orchestrator { $$$ registerIOHandler($_) { $$$ } $$$ }' # Look for any cleanup-related methods rg -A 5 "cleanup|dispose|destroy|shutdown" packages/core/src/core/orchestrator.ts # Check error handling around unsubscribers rg -A 5 "try.*{.*unsubscribers" packages/core/src/core/orchestrator.tsLength of output: 362
Script:
#!/bin/bash # Get the full content of orchestrator.ts cat packages/core/src/core/orchestrator.ts # Look for try-catch blocks rg "try\s*{" -A 5 packages/core/src/core/orchestrator.ts # Look for cleanup patterns with fixed regex rg "(cleanup|dispose|destroy|shutdown)" packages/core/src/core/orchestrator.tsLength of output: 22198
await scheduler.scheduleTaskInDb("sleever", "twitter_mentions", {}, 6000); // Check mentions every minute | ||
await scheduler.scheduleTaskInDb( | ||
"sleever", | ||
"consciousness_thoughts", | ||
{}, | ||
30000 | ||
); // Think every 5 minutes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Adjust task scheduling intervals to prevent rate limiting
The current intervals are too aggressive and could hit Twitter API rate limits:
- Mentions check every 6 seconds could make up to 150 requests per 15-minute window, dangerously close to Twitter's 180 requests limit
- Recommended changes:
- Increase mentions check to at least 60 seconds (
60000ms
) - the default interval in the Twitter client - Keep consciousness thoughts at 30 seconds or higher since it's not API-intensive
- Increase mentions check to at least 60 seconds (
🔗 Analysis chain
Verify task scheduling intervals.
The scheduling intervals seem quite aggressive:
- Mentions check: 6 seconds
- Consciousness thoughts: 30 seconds
Consider if these intervals might cause rate limiting issues with Twitter's API.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check Twitter API rate limit handling
# Search for rate limit handling in Twitter client
rg -A 10 "rate.*limit|RateLimit"
Length of output: 782
Script:
#!/bin/bash
# Find Twitter-related files and check their content
echo "=== Twitter Files ==="
fd -t f twitter -I
echo -e "\n=== Twitter API Usage ==="
rg -i "twitter.*api|api.*twitter" -A 5
echo -e "\n=== Mentions Implementation ==="
rg "mentions" -A 10
Length of output: 18153
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
packages/core/src/core/processors/chain-of-thought-processor.ts (3)
8-31
: Encapsulate or validate memory usage for reliable DB abstraction.
The constructor injectsmemory: VectorDB
directly into theChainOfThought
instance. While this is consistent with the new DB abstraction approach, consider adding validation or error handling if the injectedVectorDB
fails to initialize or has limited availability (e.g., unavailable network resources). Encapsulating these checks can improve the processor’s resilience and clarify the system’s resource requirements.
33-49
: Validate textual content more robustly incanHandle
.
ThecanHandle
method checks for keywords ("goal", "plan", etc.) and content length but does not account for edge cases like extremely large or non-textual input (e.g., binary data, HTML). For broader usage, consider implementing defensive checks or limiting memory usage for large inputs.
211-218
: Extend error information when rethrowing.
While rethrowing the error preserves the stack trace, consider adding context about which content triggered the failure for extended observability. This helps pinpoint the cause of repeated errors in logs or incident reports.packages/core/src/core/chain-of-thought.ts (1)
31-31
: Reevaluate makingstepManager
public.
ThestepManager
property was changed to public, potentially exposing it to outside modifications. If external classes only need read access, consider a getter or read-only property. This preserves encapsulation while still supporting your new scheduler architecture and DB abstraction.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/core/src/core/chain-of-thought.ts
(1 hunks)packages/core/src/core/processors/chain-of-thought-processor.ts
(1 hunks)
🔇 Additional comments (1)
packages/core/src/core/processors/chain-of-thought-processor.ts (1)
51-73
: Check for null/undefinedioContext
references before usage.
Although the code checksif (ioContext)
at line 66, ensure future expansions account for partial definitions (e.g.,ioContext
exists butavailableOutputs
is missing). Using optional chaining or additional checks avoids potential runtime errors if future refactors alter the shape ofioContext
.
try { | ||
// First, decompose the content into goals | ||
await this.chainOfThought.decomposeObjectiveIntoGoals( | ||
typeof content === "string" ? content : JSON.stringify(content) | ||
); | ||
|
||
const stats = { | ||
completed: 0, | ||
failed: 0, | ||
total: 0, | ||
}; | ||
|
||
// Execute goals until completion | ||
while (true) { | ||
const readyGoals = | ||
this.chainOfThought.goalManager.getReadyGoals(); | ||
const activeGoals = this.chainOfThought.goalManager | ||
.getGoalsByHorizon("short") | ||
.filter((g) => g.status === "active"); | ||
const pendingGoals = this.chainOfThought.goalManager | ||
.getGoalsByHorizon("short") | ||
.filter((g) => g.status === "pending"); | ||
|
||
// Log progress | ||
this.logger.debug( | ||
"ChainOfThoughtProcessor.process", | ||
"Goal execution progress:", | ||
JSON.stringify({ | ||
ready: readyGoals.length, | ||
active: activeGoals.length, | ||
pending: pendingGoals.length, | ||
completed: stats.completed, | ||
failed: stats.failed, | ||
}) | ||
); | ||
|
||
// Check if all goals are complete | ||
if ( | ||
readyGoals.length === 0 && | ||
activeGoals.length === 0 && | ||
pendingGoals.length === 0 | ||
) { | ||
this.logger.debug( | ||
"ChainOfThoughtProcessor.process", | ||
"All goals completed!", | ||
{ | ||
ready: readyGoals.length, | ||
active: activeGoals.length, | ||
pending: pendingGoals.length, | ||
completed: stats.completed, | ||
failed: stats.failed, | ||
} | ||
); | ||
break; | ||
} | ||
|
||
// Handle blocked goals | ||
if (readyGoals.length === 0 && activeGoals.length === 0) { | ||
this.logger.warn( | ||
"ChainOfThoughtProcessor.process", | ||
"No ready or active goals, but some goals are pending", | ||
{ | ||
pending: pendingGoals.length, | ||
} | ||
); | ||
pendingGoals.forEach((goal) => { | ||
const blockingGoals = | ||
this.chainOfThought.goalManager.getBlockingGoals( | ||
goal.id | ||
); | ||
this.logger.warn( | ||
"ChainOfThoughtProcessor.process", | ||
`Pending Goal: ${goal.description}`, | ||
{ | ||
blockedBy: blockingGoals.length, | ||
} | ||
); | ||
}); | ||
break; | ||
} | ||
|
||
// Execute next goal | ||
try { | ||
await this.chainOfThought.processHighestPriorityGoal(); | ||
stats.completed++; | ||
} catch (error) { | ||
this.logger.error( | ||
"ChainOfThoughtProcessor.process", | ||
"Goal execution failed:", | ||
error | ||
); | ||
stats.failed++; | ||
} | ||
|
||
stats.total++; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding a time-limit or iteration cap to avoid potential infinite loops.
The while loop at line 88 breaks when goals are completed or none are ready, but there's a possibility of stalling if goals remain in a conflicting state. Incorporating a maximum iteration or a time-based cutoff helps avoid indefinite processing under unexpected conditions. Furthermore, providing logs or system events when the loop terminates prematurely can aid in debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
packages/core/src/core/orchestrator.ts (1)
Line range hint
502-529
: Improve error handling in dispatchToInput.The error handling could be more informative and consistent with the rest of the codebase:
try { const result = await handler.execute(data); if (result) { return await this.runAutonomousFlow( result, handler.name, userId, orchestratorId ); } return []; } catch (error) { - this.logger.error( - "dispatchToInput Error", - `dispatchToInput Error: ${ - error instanceof Error ? error.message : String(error) - }` - ); + this.logger.error( + "Orchestrator.dispatchToInput", + "Failed to execute input handler", + { + name, + error: error instanceof Error ? error.message : String(error), + userId, + orchestratorId + } + ); + throw error; // Re-throw to allow proper error handling upstream }
🧹 Nitpick comments (7)
packages/core/src/core/processors/master-processor.ts (4)
17-33
: Consider enhancing logging and description for better observability.As this is a master processor responsible for content delegation:
- Consider using LogLevel.INFO as the default to improve observability of routing decisions
- Enhance the description to better reflect its role in content delegation and processing orchestration
- logLevel: LogLevel = LogLevel.ERROR + logLevel: LogLevel = LogLevel.INFO ) { super( { name: "master", description: - "This processor handles messages or short text inputs.", + "Master processor that analyzes content, delegates to specialized processors, and orchestrates processing flows for messages and short text inputs.", },
48-115
: Extract prompt construction to improve maintainability.The prompt construction logic is complex and could benefit from being moved to a separate method or configuration file. This would make it easier to maintain and test the prompt template independently.
+ protected getProcessingPrompt( + content: string, + otherContext: string, + processorContext: string, + outputsSchemaPart?: string, + actionsSchemaPart?: string + ): string { + return `You are a master processor...`; // Move the entire prompt template here + } + async process( content: any, otherContext: string, ioContext?: { availableOutputs: OutputIOHandler[]; availableActions: ActionIOHandler[]; } ): Promise<ProcessedResult> {
199-219
: Use optional chaining as suggested by static analysis.The code could benefit from using optional chaining for better readability and safety.
- if (result.classification.delegateToProcessor) { - const childProcessor = this.getProcessor( - result.classification.delegateToProcessor - ); - if (childProcessor && childProcessor.canHandle(content)) { + const childProcessor = result.classification.delegateToProcessor + ? this.getProcessor(result.classification.delegateToProcessor) + : null; + if (childProcessor?.canHandle(content)) {🧰 Tools
🪛 Biome (1.9.4)
[error] 204-204: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
246-248
: Enhance error logging for better debugging.The error logging could be more detailed to help with debugging processing failures.
- this.logger.error("Processor.process", "Processing failed", { - error, - }); + this.logger.error("Processor.process", "Processing failed", { + error, + content: typeof content === 'string' ? content : JSON.stringify(content), + errorMessage: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined + });packages/core/src/core/orchestrator.ts (3)
46-53
: Consider reordering constructor parameters for better maintainability.The constructor parameters mix core dependencies with optional configurations. Consider grouping required dependencies first, followed by optional parameters:
constructor( - private readonly roomManager: RoomManager, - vectorDb: VectorDB, - private processor: BaseProcessor, - orchestratorDb: OrchestratorDb, - config?: LoggerConfig + private readonly orchestratorDb: OrchestratorDb, + private readonly processor: BaseProcessor, + private readonly roomManager: RoomManager, + vectorDb: VectorDB, + config?: LoggerConfig )
346-369
: Refactor task scheduling logic into a separate method.The task scheduling logic within
runAutonomousFlow
is complex and would be more maintainable as a separate method.+private async scheduleTask( + userId: string, + task: { name: string; intervalMs?: number; data: unknown } +): Promise<void> { + const now = Date.now(); + const nextRunAt = new Date(now + (task.intervalMs ?? 0)); + + this.logger.info( + "Orchestrator.scheduleTask", + `Scheduling task ${task.name}`, + { + nextRunAt, + intervalMs: task.intervalMs, + } + ); + + await this.orchestratorDb.createTask( + userId, + task.name, + { + request: task.name, + task_data: JSON.stringify(task.data), + }, + nextRunAt, + task.intervalMs + ); +} // In runAutonomousFlow: -const now = Date.now(); -const nextRunAt = new Date(now + (task.intervalMs ?? 0)); - -this.logger.info( - "Orchestrator.runAutonomousFlow", - `Scheduling task ${task.name}`, - { - nextRunAt, - intervalMs: task.intervalMs, - } -); - -await this.orchestratorDb.createTask( - userId, - task.name, - { - request: task.name, - task_data: JSON.stringify(task.data), - }, - nextRunAt, - task.intervalMs -); +await this.scheduleTask(userId, task);
8-8
: Add JSDoc documentation for the OrchestratorDb interface.The
OrchestratorDb
interface is a crucial part of the system's architecture, but it lacks proper documentation. Consider adding comprehensive JSDoc comments to document the interface's purpose, methods, and expected behavior.+/** + * Interface for database operations related to orchestrator functionality. + * Implementations of this interface handle persistence of orchestrator state, + * messages, and scheduled tasks. + */ import type { OrchestratorDb } from "./memory";
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/core/src/core/orchestrator.ts
(18 hunks)packages/core/src/core/processors/master-processor.ts
(1 hunks)packages/core/src/core/processors/research-processor.ts
(0 hunks)
💤 Files with no reviewable changes (1)
- packages/core/src/core/processors/research-processor.ts
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/core/processors/master-processor.ts
[error] 204-204: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
🔇 Additional comments (2)
packages/core/src/core/processors/master-processor.ts (2)
1-16
: LGTM! Clean and well-organized imports.The imports are properly organized, separating type imports from regular imports, and all imported entities are used in the implementation.
235-235
: Address the TODO comment regarding memory abstraction.The TODO comment indicates a need to fix the memory abstraction. This should be tracked and addressed.
Would you like me to help create an issue to track this TODO or propose a solution for the memory abstraction?
public canHandle(content: any): boolean { | ||
// Convert content to string for length check | ||
const contentStr = | ||
typeof content === "string" ? content : JSON.stringify(content); | ||
|
||
// Check if content is short enough for message processing (<1000 chars) | ||
return contentStr.length < this.contentLimit; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add type safety and error handling to content validation.
Several improvements could enhance the robustness of content handling:
- The
contentLimit
property is not defined in the visible code - JSON.stringify could throw on circular references
- The
content
parameter could benefit from type narrowing
+ protected readonly contentLimit = 1000; // Define the limit
+
public canHandle(content: any): boolean {
+ try {
// Convert content to string for length check
const contentStr =
typeof content === "string" ? content : JSON.stringify(content);
// Check if content is short enough for message processing (<1000 chars)
return contentStr.length < this.contentLimit;
+ } catch (error) {
+ this.logger.warn("canHandle failed", { error });
+ return false;
+ }
}
Committable suggestion skipped: line range outside the PR's diff.
const existingOrchestrator = | ||
await this.orchestratorDb.getOrchestratorById(orchestratorId); | ||
|
||
if (!existingOrchestrator) { | ||
orchestratorId = await this.mongoDb.createOrchestrator(userId); | ||
orchestratorId = | ||
await this.orchestratorDb.createOrchestrator(userId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential race condition in orchestrator creation.
There's a potential race condition between checking for an existing orchestrator and creating a new one. If multiple requests arrive simultaneously, this could lead to multiple orchestrators being created for the same ID.
Consider using a transaction or implementing proper locking:
-const existingOrchestrator =
- await this.orchestratorDb.getOrchestratorById(orchestratorId);
-
-if (!existingOrchestrator) {
- orchestratorId =
- await this.orchestratorDb.createOrchestrator(userId);
-}
+orchestratorId = await this.orchestratorDb.getOrCreateOrchestrator(
+ orchestratorId,
+ userId
+);
This would require adding a new atomic method getOrCreateOrchestrator
to the OrchestratorDb
interface.
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
packages/core/src/core/schedule-service.ts (1)
34-102
:⚠️ Potential issueHandle task failures with a fallback flow.
If an error occurs after calling
markRunning
but before callingmarkCompleted
, the task remains stuck in the "running" state.try { const tasks = await this.context.orchestratorDb.findDueTasks(); for (const task of tasks) { + try { await this.context.orchestratorDb.markRunning(task._id); const handler = this.orchestrator.getHandler(task.handlerName); ... + } catch (error) { + await this.context.orchestratorDb.markCompleted(task._id, true); + this.context.logger.error("Task processing error", "error", { taskId: task._id, error }); + continue; + } } } catch (err) { this.context.logger.error("pollTasks error", "error", { data: err }); }packages/core/src/core/orchestrator.ts (1)
290-294
:⚠️ Potential issueFix potential race condition in orchestrator creation.
There's a potential race condition between checking for an existing orchestrator and creating a new one.
Consider using a transaction or implementing proper locking:
-const existing = - await this.orchestratorDb.getOrchestratorById(orchestratorId); -if (!existing) { - orchestratorId = - await this.orchestratorDb.createOrchestrator(userId); -} +orchestratorId = await this.orchestratorDb.getOrCreateOrchestrator( + orchestratorId, + userId +);
🧹 Nitpick comments (4)
packages/core/src/core/schedule-service.ts (1)
14-32
: Add validation for the polling interval.The polling interval should be validated to ensure it's within reasonable bounds (e.g., not too short to avoid overwhelming the system, not too long to maintain responsiveness).
constructor( private context: IOrchestratorContext, private orchestrator: Orchestrator, - private pollMs: number = 10_000 + private pollMs: number = 10_000 ) { + if (pollMs < 1000) { + throw new Error('Polling interval must be at least 1 second'); + } + if (pollMs > 3600000) { + throw new Error('Polling interval must not exceed 1 hour'); + } }packages/core/src/core/types/index.ts (2)
288-288
: Document the nextProcessor property usage.The
nextProcessor
property enables processor chaining, but its usage pattern isn't immediately clear.Add JSDoc comments to explain:
- Expected format of the processor name
- How it's used in the processing pipeline
- Example usage
+ /** + * Name of the next processor to handle this result. + * @example "sentiment-analyzer" or "translation-processor" + */ nextProcessor?: string;
614-616
: Consider extending AgentRequest interface.The interface currently only includes headers, but might need to be extended in the future.
Consider adding commonly needed request properties:
export interface AgentRequest { headers: Record<string, string>; + method?: string; + path?: string; + query?: Record<string, string>; + body?: unknown; }packages/core/src/core/orchestrator.ts (1)
592-600
: Consider batching memory operations.The code performs multiple database operations sequentially when saving memories.
Consider batching the operations for better performance:
-await this.roomManager.addMemory( - content.room, - JSON.stringify(result.content), - { - source, - ...result.metadata, - ...result.enrichedContext, - } -); -await this.roomManager.markContentAsProcessed( - content.contentId, - content.room -); +await Promise.all([ + this.roomManager.addMemory( + content.room, + JSON.stringify(result.content), + { + source, + ...result.metadata, + ...result.enrichedContext, + } + ), + this.roomManager.markContentAsProcessed( + content.contentId, + content.room + ) +]);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
examples/example-api.ts
(4 hunks)examples/example-server.ts
(3 hunks)packages/core/src/core/orchestrator.ts
(21 hunks)packages/core/src/core/schedule-service.ts
(1 hunks)packages/core/src/core/types/index.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- examples/example-api.ts
🔇 Additional comments (6)
packages/core/src/core/schedule-service.ts (2)
7-12
: LGTM! Well-defined interface with clear dependencies.The interface clearly defines all required dependencies for the scheduler service.
134-139
: LGTM! Clean implementation of the stop method.The method properly checks for and cleans up the polling interval.
examples/example-server.ts (3)
20-21
: LGTM! Import paths updated to reflect new structure.The import paths have been correctly updated to match the new directory structure, with MongoDb moved to the db subdirectory and MasterProcessor added from the processors subdirectory.
55-68
: Verify processor initialization order.The code initializes both MasterProcessor and MessageProcessor correctly. However, ensure that any processor-specific configurations or state are properly transferred when adding MessageProcessor to MasterProcessor.
Consider documenting the processor hierarchy and initialization requirements to help maintainers understand the correct setup order.
157-162
: Ensure consistent user ID handling.The code now correctly passes user information through request headers. However, verify that all downstream components consistently extract and use the user ID from
x-user-id
header.Run the following script to verify consistent user ID handling:
✅ Verification successful
User ID handling is consistent across the codebase ✅
The
x-user-id
header is consistently used for passing and extracting user information, with proper fallback handling in the orchestrator component.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for consistent user ID extraction across the codebase # Search for x-user-id header usage rg -A 5 'x-user-id' # Search for userId extraction from headers ast-grep --pattern 'headers["x-user-id"]'Length of output: 2664
Script:
#!/bin/bash # Search for other potential user ID header patterns rg -i "user.?id.*header" -A 3 rg -i "header.*user.?id" -A 3 # Search for different header access patterns ast-grep --pattern 'headers.$_["user"]' ast-grep --pattern 'headers.$_["User"]' ast-grep --pattern 'get.*[Hh]eader.*[Uu]ser' # Search for direct userId access rg "getUserId|get_user_id" -A 3Length of output: 939
packages/core/src/core/orchestrator.ts (1)
Line range hint
26-62
: LGTM! Improved initialization and logging.The orchestrator initialization has been improved with better property documentation and consistent logging.
public async scheduleTaskInDb( | ||
userId: string, | ||
handlerName: string, | ||
data: Record<string, unknown> = {}, | ||
intervalMs?: number | ||
): Promise<string> { | ||
const now = Date.now(); | ||
const nextRunAt = new Date(now + (intervalMs ?? 0)); | ||
|
||
this.context.logger.info( | ||
"SchedulerService.scheduleTaskInDb", | ||
`Scheduling task ${handlerName}`, | ||
{ | ||
nextRunAt, | ||
intervalMs, | ||
} | ||
); | ||
|
||
return await this.context.orchestratorDb.createTask( | ||
userId, | ||
handlerName, | ||
{ | ||
request: handlerName, | ||
task_data: JSON.stringify(data), | ||
}, | ||
nextRunAt, | ||
intervalMs | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate handler existence before scheduling.
The method should verify that the handler exists before scheduling a task to prevent scheduling tasks for non-existent handlers.
public async scheduleTaskInDb(
userId: string,
handlerName: string,
data: Record<string, unknown> = {},
intervalMs?: number
): Promise<string> {
+ const handler = this.orchestrator.getHandler(handlerName);
+ if (!handler) {
+ throw new Error(`No handler registered with name: ${handlerName}`);
+ }
+
const now = Date.now();
const nextRunAt = new Date(now + (intervalMs ?? 0));
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
public async scheduleTaskInDb( | |
userId: string, | |
handlerName: string, | |
data: Record<string, unknown> = {}, | |
intervalMs?: number | |
): Promise<string> { | |
const now = Date.now(); | |
const nextRunAt = new Date(now + (intervalMs ?? 0)); | |
this.context.logger.info( | |
"SchedulerService.scheduleTaskInDb", | |
`Scheduling task ${handlerName}`, | |
{ | |
nextRunAt, | |
intervalMs, | |
} | |
); | |
return await this.context.orchestratorDb.createTask( | |
userId, | |
handlerName, | |
{ | |
request: handlerName, | |
task_data: JSON.stringify(data), | |
}, | |
nextRunAt, | |
intervalMs | |
); | |
} | |
public async scheduleTaskInDb( | |
userId: string, | |
handlerName: string, | |
data: Record<string, unknown> = {}, | |
intervalMs?: number | |
): Promise<string> { | |
const handler = this.orchestrator.getHandler(handlerName); | |
if (!handler) { | |
throw new Error(`No handler registered with name: ${handlerName}`); | |
} | |
const now = Date.now(); | |
const nextRunAt = new Date(now + (intervalMs ?? 0)); | |
this.context.logger.info( | |
"SchedulerService.scheduleTaskInDb", | |
`Scheduling task ${handlerName}`, | |
{ | |
nextRunAt, | |
intervalMs, | |
} | |
); | |
return await this.context.orchestratorDb.createTask( | |
userId, | |
handlerName, | |
{ | |
request: handlerName, | |
task_data: JSON.stringify(data), | |
}, | |
nextRunAt, | |
intervalMs | |
); | |
} |
// Possibly schedule any tasks in the DB | ||
if (processed.updateTasks) { | ||
for (const task of processed.updateTasks) { | ||
await this.scheduleTaskInDb( | ||
userId, | ||
task.name, | ||
task.data, | ||
task.intervalMs | ||
const now = Date.now(); | ||
const nextRunAt = new Date( | ||
now + (task.intervalMs ?? 0) | ||
); | ||
|
||
this.logger.debug( | ||
this.logger.info( | ||
"Orchestrator.runAutonomousFlow", | ||
"Scheduled task in DB", | ||
`Scheduling task ${task.name}`, | ||
{ nextRunAt, intervalMs: task.intervalMs } | ||
); | ||
|
||
await this.orchestratorDb.createTask( | ||
userId, | ||
task.name, | ||
{ | ||
task, | ||
} | ||
request: task.name, | ||
task_data: JSON.stringify(task.data), | ||
}, | ||
nextRunAt, | ||
task.intervalMs | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve task scheduling error handling.
The task scheduling code lacks error handling and validation.
Add error handling and validation:
if (processed.updateTasks) {
for (const task of processed.updateTasks) {
+ if (!task.name) {
+ this.logger.warn(
+ "Orchestrator.runAutonomousFlow",
+ "Skipping task with missing name"
+ );
+ continue;
+ }
+
+ if (task.intervalMs < 0) {
+ this.logger.warn(
+ "Orchestrator.runAutonomousFlow",
+ "Invalid interval, using default",
+ { taskName: task.name, invalidInterval: task.intervalMs }
+ );
+ task.intervalMs = 60000; // Default to 1 minute
+ }
+
const now = Date.now();
const nextRunAt = new Date(now + (task.intervalMs ?? 0));
- await this.orchestratorDb.createTask(
- userId,
- task.name,
- {
- request: task.name,
- task_data: JSON.stringify(task.data),
- },
- nextRunAt,
- task.intervalMs
- );
+ try {
+ await this.orchestratorDb.createTask(
+ userId,
+ task.name,
+ {
+ request: task.name,
+ task_data: JSON.stringify(task.data),
+ },
+ nextRunAt,
+ task.intervalMs
+ );
+ } catch (error) {
+ this.logger.error(
+ "Orchestrator.runAutonomousFlow",
+ "Failed to create task",
+ { taskName: task.name, error }
+ );
+ }
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Possibly schedule any tasks in the DB | |
if (processed.updateTasks) { | |
for (const task of processed.updateTasks) { | |
await this.scheduleTaskInDb( | |
userId, | |
task.name, | |
task.data, | |
task.intervalMs | |
const now = Date.now(); | |
const nextRunAt = new Date( | |
now + (task.intervalMs ?? 0) | |
); | |
this.logger.debug( | |
this.logger.info( | |
"Orchestrator.runAutonomousFlow", | |
"Scheduled task in DB", | |
`Scheduling task ${task.name}`, | |
{ nextRunAt, intervalMs: task.intervalMs } | |
); | |
await this.orchestratorDb.createTask( | |
userId, | |
task.name, | |
{ | |
task, | |
} | |
request: task.name, | |
task_data: JSON.stringify(task.data), | |
}, | |
nextRunAt, | |
task.intervalMs | |
); | |
// Possibly schedule any tasks in the DB | |
if (processed.updateTasks) { | |
for (const task of processed.updateTasks) { | |
if (!task.name) { | |
this.logger.warn( | |
"Orchestrator.runAutonomousFlow", | |
"Skipping task with missing name" | |
); | |
continue; | |
} | |
if (task.intervalMs < 0) { | |
this.logger.warn( | |
"Orchestrator.runAutonomousFlow", | |
"Invalid interval, using default", | |
{ taskName: task.name, invalidInterval: task.intervalMs } | |
); | |
task.intervalMs = 60000; // Default to 1 minute | |
} | |
const now = Date.now(); | |
const nextRunAt = new Date( | |
now + (task.intervalMs ?? 0) | |
); | |
this.logger.info( | |
"Orchestrator.runAutonomousFlow", | |
`Scheduling task ${task.name}`, | |
{ nextRunAt, intervalMs: task.intervalMs } | |
); | |
try { | |
await this.orchestratorDb.createTask( | |
userId, | |
task.name, | |
{ | |
request: task.name, | |
task_data: JSON.stringify(task.data), | |
}, | |
nextRunAt, | |
task.intervalMs | |
); | |
} catch (error) { | |
this.logger.error( | |
"Orchestrator.runAutonomousFlow", | |
"Failed to create task", | |
{ taskName: task.name, error } | |
); | |
} |
Summary by CodeRabbit
Refactor
New Features
Chores